package com.example.mq.mqserver.datacenter;

import com.example.mq.common.BinaryTool;
import com.example.mq.common.MqException;
import com.example.mq.mqserver.core.MSGQueue;
import com.example.mq.mqserver.core.Message;

import java.io.*;
import java.util.LinkedList;
import java.util.Scanner;

public class MessageFileManager {
    static public class Stat{
        public int totalCount;
        public int validCount;
    }

    public void init(){

    }

    private String getQueueDir(String queueName){
        return "./data/"+queueName;
    }

    private String getQueueDataPath(String queueName){
        return getQueueDir(queueName)+"/queue_data.txt";
    }

    private String getQueueStatPath(String queueName){
        return getQueueDir(queueName)+"/queue_stat.txt";
    }

    private Stat readStat(String queueName) {
        Stat stat = new Stat();
        try (InputStream inputStream = new FileInputStream(getQueueStatPath(queueName))) {
            Scanner scanner = new Scanner(inputStream);
            stat.totalCount = scanner.nextInt();
            stat.validCount = scanner.nextInt();
            return stat;
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }

    private void writeStat(String queueName,Stat stat){
        try (OutputStream outputStream = new FileOutputStream(getQueueStatPath(queueName))) {
            PrintWriter printWriter=new PrintWriter(outputStream);
            printWriter.write(stat.totalCount + "\t" + stat.validCount);
            printWriter.flush();
        }catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void createQueueFiles(String queueName) throws IOException {
        File baseDir=new File(getQueueDir(queueName));
        if(!baseDir.exists()){
            boolean ok=baseDir.mkdirs();
            if(!ok){
                throw new IOException("创建目录失败!baseDir="+baseDir.getAbsolutePath());
            }
        }
        File queueDataFile=new File(getQueueDataPath(queueName));
        if(!queueDataFile.exists()){
            boolean ok=queueDataFile.createNewFile();
            if(!ok){
                throw new IOException("创建文件失败！queueDataFile="+queueDataFile.getAbsolutePath());
            }
        }
        File queueStatFile=new File(getQueueStatPath(queueName));
        if(!queueStatFile.exists()){
            boolean ok=queueStatFile.createNewFile();
            if(!ok){
                throw new IOException("创建文件失败！queueStatFile="+queueStatFile.getAbsolutePath());
            }
        }
        Stat stat=new Stat();
        stat.totalCount=0;
        stat.validCount=0;
        writeStat(queueName,stat);
    }

    public void destroyQueueFiles(String queueName) throws IOException {
        File queueDataFile=new File(getQueueDataPath(queueName));
        File queueStatFile=new File(getQueueStatPath(queueName));
        File baseDir=new File(getQueueDir(queueName));
        boolean ok1=queueDataFile.delete();
        boolean ok2=queueStatFile.delete();
        boolean ok3=baseDir.delete();
        if (!ok1||!ok2||!ok3){
            throw new IOException("删除队列目录和文件失败！baseDir="+baseDir.getAbsolutePath());
        }
    }

    public boolean checkFilesExits(String queueName){
        File queueDataFile=new File(getQueueDataPath(queueName));
        File queueStatFile=new File(getQueueStatPath(queueName));
        boolean ok1=queueDataFile.exists();
        boolean ok2=queueStatFile.exists();
        if(ok1&&ok2){
            return true;
        }
        return false;
    }

    public void sendMessage(MSGQueue queue, Message message) throws MqException, IOException {
        if (!checkFilesExits(queue.getName())){
            throw new MqException("[MessageFileManager] 队列对应的文件不存在！queueName="+queue.getName());
        }
        byte[] messageBinary= BinaryTool.toBytes(message);
        synchronized (queue){
            File queueDataFile=new File(getQueueDataPath(queue.getName()));
            message.setOffsetBeg(queueDataFile.length()+4);
            message.setOffsetEnd(queueDataFile.length()+4+messageBinary.length);
            try (OutputStream outputStream=new FileOutputStream(queueDataFile,true)){
                try (DataOutputStream dataOutputStream=new DataOutputStream(outputStream)){
                    dataOutputStream.writeInt(messageBinary.length);
                    dataOutputStream.write(messageBinary);
                }
            }
            Stat stat=readStat(queue.getName());
            stat.totalCount+=1;
            stat.validCount+=1;
            writeStat(queue.getName(),stat);
        }
    }

    public void deleteMessage(MSGQueue queue,Message message) throws IOException, ClassNotFoundException {
        synchronized (queue){
            try (RandomAccessFile randomAccessFile= new RandomAccessFile(getQueueDataPath(queue.getName()), "rw")){
                byte[] bufferSrc=new byte[(int)(message.getOffsetEnd()-message.getOffsetBeg())];
                randomAccessFile.seek(message.getOffsetBeg());
                randomAccessFile.read(bufferSrc);
                Message diskMessage= (Message) BinaryTool.fromBytes(bufferSrc);
                diskMessage.setIsValid((byte) 0x0);
                byte[] bufferDest=BinaryTool.toBytes(diskMessage);
                randomAccessFile.seek(message.getOffsetBeg());
                randomAccessFile.write(bufferDest);
            }
            Stat stat=readStat(queue.getName());
            if(stat.validCount>0){
                stat.validCount-=1;
            }
            writeStat(queue.getName(),stat);
        }
    }

    public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {
        LinkedList<Message> messages=new LinkedList<>();
        try (InputStream inputStream=new FileInputStream(getQueueDataPath(queueName))){
            try (DataInputStream dataInputStream=new DataInputStream(inputStream)){
                long currentOffset=0;
                while (true){
                    int messageSize=dataInputStream.readInt();
                    byte[] buffer=new byte[messageSize];
                    int actualSize=dataInputStream.read(buffer);
                    if(messageSize!=actualSize){
                        throw new MqException("[MessageFileManager] 文件格式错误！queueName="+queueName);
                    }
                    Message message= (Message) BinaryTool.fromBytes(buffer);
                    if(message.getIsValid()!=0x1){
                        currentOffset+=(4+messageSize);
                        continue;
                    }
                    message.setOffsetBeg(currentOffset+4);
                    message.setOffsetEnd(currentOffset+4+messageSize);
                    currentOffset+=(4+messageSize);
                    messages.add(message);
                }
            }catch (EOFException e){
                System.out.println("[MessageFileManager] 恢复Message数据完成！");
            }
        }
        return messages;
    }

    public boolean checkGC(String queueName){
        Stat stat=readStat(queueName);
        if(stat.totalCount>2000 && (double)stat.validCount/(double)stat.totalCount<0.5){
            return true;
        }
        return false;
    }

    private String getQueueDataNewPath(String queueName){
        return getQueueDir(queueName)+"/queue_data_new.txt";
    }

    public void GC(MSGQueue queue) throws MqException, IOException, ClassNotFoundException {
        synchronized (queue){
            long gcBeg=System.currentTimeMillis();

            File queueDataNewFile=new File(getQueueDataNewPath(queue.getName()));
            if(queueDataNewFile.exists()){
                throw new MqException("[MessageFileManager] GC时发现该队列的queue_data_new已经存在！queueName="+queue.getName());
            }
            boolean ok=queueDataNewFile.createNewFile();
            if(!ok){
                throw new MqException("[MessageFileManager] 创建文件失败！queueDataNewFile="+queueDataNewFile.getAbsolutePath());
            }
            LinkedList<Message> messages=loadAllMessageFromQueue(queue.getName());
            try (OutputStream outputStream=new FileOutputStream(queueDataNewFile)){
                try (DataOutputStream dataOutputStream=new DataOutputStream(outputStream)){
                    for (Message message:messages){
                        byte[] buffer=BinaryTool.toBytes(message);
                        dataOutputStream.writeInt(buffer.length);
                        dataOutputStream.write(buffer);
                    }
                }
            }
            File queueDataOldFile=new File(getQueueDataPath(queue.getName()));
            ok=queueDataOldFile.delete();
            if(!ok){
                throw new MqException("[MessageFileManager] 删除旧的数据文件失败！queueDataOldFile="+queueDataOldFile.getAbsolutePath());
            }
            ok=queueDataNewFile.renameTo(queueDataOldFile);
            if(!ok){
                throw new MqException("[MessageFileManager] 文件重命名失败！queueDataNewFile="+queueDataNewFile.getAbsolutePath()+",queueDataOldFile="+queueDataOldFile.getAbsolutePath());
            }
            Stat stat=readStat(queue.getName());
            stat.totalCount=messages.size();
            stat.validCount=messages.size();
            writeStat(queue.getName(),stat);

            long gcEnd=System.currentTimeMillis();
            System.out.println("[MessageFileManager] GC执行完成！queueName="+queue.getName()+",time="+(gcEnd-gcBeg)+"ms");
        }
    }
}
